{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 10: SparkStreaming\n",
    "\n",
    "This example shows the original implementation of streaming in Spark, the _Spark streaming_ capability that is based on the `RDD` API. We construct a simple \"word count\" server. This example watches a directory for new files and reads them as they arrive. The corresponding program version of this example, [SparkStreaming10.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/SparkStreaming10.scala), supports this input source and a second option, input from a socket. See the [Tutorial.markdown](https://github.com/deanwampler/spark-scala-tutorial/blob/master/Tutorial.markdown), for details."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The newer streaming module is called _Structured Streaming_. It is based on the `Dataset` API, for better performance and convenience. It has supports much lower-latency processing. Examples of this API are TBD here, but see the [Apache Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for more information."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Watching a directory for new files supports a workflow where some process outputs new files to a \"staging\" directory where this job will do subsequent processing.\n",
    "\n",
    "Note that Spark Streaming does not use the `_SUCCESS` marker file we mentioned in an earlier notebook for batch processing, in part because that mechanism can only be used once *all* files are written to the directory. Hence, Spark can't know when writing the file has actually completed. This means you should only use this ingestion mechanism with files that \"appear instantly\" in the directory, i.e., through renaming from another location in the file system.\n",
    "\n",
    "For the example, a temporary directory is created and a second process writes the user-specified data file(s) (default: Enron emails) to a temporary directory every second. `SparkStreaming10` does *Word Count* on the data. Hence, the data would eventually repeat, but for convenience, we also stop after 200 iterations (the number of email files)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import java.io.File"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dataSource = ../data/enron-spam-ham\n",
       "watchedDirectory = tmp/streaming-input\n",
       "outputPathRoot = streaming-output\n",
       "outputPath = streaming-output/wc-streaming\n",
       "iterations = 200\n",
       "sleepIntervalMillis = 1000\n",
       "batchSeconds = 2\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "2"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val dataSource = new File(\"../data/enron-spam-ham\")\n",
    "val watchedDirectory = new File(\"tmp/streaming-input\")\n",
    "val outputPathRoot = new File(\"streaming-output/\")\n",
    "outputPathRoot.mkdirs()\n",
    "val outputPath = new File(outputPathRoot, \"wc-streaming\")\n",
    "val iterations = 200            // Terminate after N iterations\n",
    "val sleepIntervalMillis = 1000  // How often to wait between writes of files to the directory\n",
    "val batchSeconds = 2            // Size of batch intervals"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A function to delete a file or a directory and its contents"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "rmrf: (root: java.io.File)Unit\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "def rmrf(root: File): Unit = {\n",
    "  if (root.isFile) root.delete()\n",
    "  else if (root.exists) {\n",
    "    root.listFiles.foreach(rmrf)\n",
    "    root.delete()\n",
    "  }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use it to remove the watched directory, if one exists from a previous run. Then recreate it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmrf(watchedDirectory)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "true"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "watchedDirectory.mkdirs()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We need a second process or dedicated thread to write new files to the watch directory. To support we'll insert here a striped-down version of [util.streaming.DataDirectoryServer.scala](https://github.com/deanwampler/spark-scala-tutorial/blob/master/src/main/scala/sparktutorial/util/streaming/DataDirectoryServer.scala) in the application version of the tutorial. It runs its logic in a separate thread. It Serves data to be used by this notebook by periodically writing a new file to a watched directory, as discussed below."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class DataServerError\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "case class DataServerError(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class DataDirectoryServer\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path\n",
       "makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path\n",
       "makePath: (pathString: String)java.nio.file.Path <and> (file: java.io.File)java.nio.file.Path <and> (parent: java.nio.file.Path, name: String)java.nio.file.Path\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "import java.nio.file.{Files, FileSystems, Path}\n",
    "import java.nio.file.attribute.BasicFileAttributes\n",
    "import java.util.function.BiPredicate\n",
    "import scala.util.control.NonFatal\n",
    "import scala.collection.JavaConverters._\n",
    "\n",
    "def makePath(pathString: String): Path = FileSystems.getDefault().getPath(pathString)\n",
    "def makePath(file: java.io.File): Path = makePath(file.getAbsolutePath)\n",
    "def makePath(parent: Path, name: String): Path = FileSystems.getDefault().getPath(parent.toString, name)\n",
    "\n",
    "case class DataDirectoryServer(destinationDirectoryPath: Path, sourceRootPath: Path) extends Runnable {\n",
    "\n",
    "  def run: Unit = try {\n",
    "    val sources = getSourcePaths(sourceRootPath)\n",
    "    if (sources.size == 0) throw DataServerError(s\"No sources for path $sourceRootPath!\")\n",
    "\n",
    "    sources.zipWithIndex.foreach { case (source, index) =>\n",
    "      val destination = makePath(destinationDirectoryPath, source.getFileName.toString)\n",
    "      println(s\"\\nIteration ${index+1}: destination: ${destination}\")\n",
    "      Files.copy(source, destination)\n",
    "      Thread.sleep(sleepIntervalMillis)\n",
    "    }\n",
    "  } catch {\n",
    "    case NonFatal(ex) => throw DataServerError(\"Data serving failed!\", ex)\n",
    "  }\n",
    "\n",
    "  /**\n",
    "   * Get the paths for the source files.\n",
    "   */\n",
    "  protected def getSourcePaths(sourcePath: Path): Seq[Path] =\n",
    "    Files.find(sourcePath, 5,\n",
    "      new BiPredicate[Path, BasicFileAttributes]() {\n",
    "        def test(path: Path, attribs: BasicFileAttributes): Boolean = attribs.isRegularFile\n",
    "      }).iterator.asScala.toSeq\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Here is the Spark code for processing the stream. Start by creating the `StreamingContext`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "sc = org.apache.spark.SparkContext@31da4833\n",
       "ssc = org.apache.spark.streaming.StreamingContext@79b3680f\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "org.apache.spark.streaming.StreamingContext@79b3680f"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import org.apache.spark.streaming.{Seconds, StreamingContext}\n",
    "import org.apache.spark.streaming.scheduler.{\n",
    "  StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStopped}\n",
    "\n",
    "val sc = spark.sparkContext\n",
    "val ssc = new StreamingContext(sc, Seconds(batchSeconds))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a listener for the end of the stream.\n",
    "\n",
    "> **Note:** We have to repeat import statements because of scoping idiosyncrasies in the way cells are converted to Scala."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined class EndOfStreamListener\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "import org.apache.spark.streaming.StreamingContext\n",
    "import org.apache.spark.streaming.scheduler.{\n",
    "  StreamingListener, StreamingListenerReceiverError, StreamingListenerReceiverStopped}\n",
    "\n",
    "class EndOfStreamListener(sc: StreamingContext) extends StreamingListener {\n",
    "  override def onReceiverError(error: StreamingListenerReceiverError):Unit = {\n",
    "    println(s\"Receiver Error: $error. Stopping...\")\n",
    "    sc.stop()\n",
    "  }\n",
    "  override def onReceiverStopped(stopped: StreamingListenerReceiverStopped):Unit = {\n",
    "    println(s\"Receiver Stopped: $stopped. Stopping...\")\n",
    "  }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "ssc.addStreamingListener(new EndOfStreamListener(ssc))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now add the logic to process to the data.\n",
    "\n",
    "We do _Word Count_, splitting on non-alphabetic characters."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "lines = org.apache.spark.streaming.dstream.MappedDStream@75d6a390\n",
       "words = org.apache.spark.streaming.dstream.FlatMappedDStream@3879ca1b\n",
       "pairs = org.apache.spark.streaming.dstream.MappedDStream@299e0184\n",
       "wordCounts = org.apache.spark.streaming.dstream.ShuffledDStream@47565b39\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "org.apache.spark.streaming.dstream.ShuffledDStream@47565b39"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val lines = ssc.textFileStream(watchedDirectory.getAbsolutePath)\n",
    "val words = lines.flatMap(line => line.split(\"\"\"[^\\p{IsAlphabetic}]+\"\"\"))\n",
    "val pairs = words.map(word => (word, 1))\n",
    "val wordCounts = pairs.reduceByKey(_ + _)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calling print will cause some useful diagnostic output to be printed during every mini-batch:\n",
    "\n",
    "```text\n",
    "-------------------------------------------\n",
    "Time: 1413724627000 ms\n",
    "-------------------------------------------\n",
    "(limitless,2)\n",
    "(grand,2)\n",
    "(someone,4)\n",
    "(priority,2)\n",
    "(goals,1)\n",
    "(ll,5)\n",
    "(agree,1)\n",
    "(offer,2)\n",
    "(yahoo,3)\n",
    "(ebook,3)\n",
    "...\n",
    "```\n",
    "\n",
    "The time stamp will increment by 2000 ms each time, because we're running with 2-second batch intervals (or whatever you set `batchSeconds` to above). This particular output comes from the `print` method we added above, which is a useful debug tool for seeing the first 10 or so values in the current batch `RDD`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "wordCounts.print()  // print a few counts..."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Calling `saveAsTextFile` will cause new directories to be written under the `outputPath` directory, one new directory per mini-batch. They have names like `output/wc-streaming-1413724628000.out`, with a timestamp appended to our default output argument `output/wc-streaming`, and the extension we add, `out`. Each of these will contain the usual `_SUCCESS` and `part-0000N` files, one for each core that the task is given."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Generates a separate subdirectory for each interval!!\n",
    "wordCounts.saveAsTextFiles(outputPath.getAbsolutePath, \"out\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now start the background thread:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Iteration 1: destination: /home/jovyan/notebooks/tmp/streaming-input/0003.2004-08-01.BG.spam.txt\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "directoryServerThread = Thread[Thread-22,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Thread[Thread-22,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Iteration 2: destination: /home/jovyan/notebooks/tmp/streaming-input/0018.2004-08-03.BG.spam.txt\n",
      "\n",
      "Iteration 3: destination: /home/jovyan/notebooks/tmp/streaming-input/0031.2001-08-03.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 4: destination: /home/jovyan/notebooks/tmp/streaming-input/0016.2001-07-06.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 5: destination: /home/jovyan/notebooks/tmp/streaming-input/0024.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 6: destination: /home/jovyan/notebooks/tmp/streaming-input/0009.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 7: destination: /home/jovyan/notebooks/tmp/streaming-input/0018.2001-07-13.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 8: destination: /home/jovyan/notebooks/tmp/streaming-input/0024.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 9: destination: /home/jovyan/notebooks/tmp/streaming-input/0029.2004-08-03.BG.spam.txt\n",
      "\n",
      "Iteration 10: destination: /home/jovyan/notebooks/tmp/streaming-input/0003.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 11: destination: /home/jovyan/notebooks/tmp/streaming-input/0037.2001-08-05.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 12: destination: /home/jovyan/notebooks/tmp/streaming-input/0004.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 13: destination: /home/jovyan/notebooks/tmp/streaming-input/0019.2001-07-06.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 14: destination: /home/jovyan/notebooks/tmp/streaming-input/0038.2001-08-05.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 15: destination: /home/jovyan/notebooks/tmp/streaming-input/0029.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 16: destination: /home/jovyan/notebooks/tmp/streaming-input/0014.2003-12-19.GP.spam.txt\n",
      "\n",
      "Iteration 17: destination: /home/jovyan/notebooks/tmp/streaming-input/0023.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 18: destination: /home/jovyan/notebooks/tmp/streaming-input/0007.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 19: destination: /home/jovyan/notebooks/tmp/streaming-input/0007.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 20: destination: /home/jovyan/notebooks/tmp/streaming-input/0032.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 21: destination: /home/jovyan/notebooks/tmp/streaming-input/0008.2001-06-12.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 22: destination: /home/jovyan/notebooks/tmp/streaming-input/0016.2001-07-05.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 23: destination: /home/jovyan/notebooks/tmp/streaming-input/0026.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 24: destination: /home/jovyan/notebooks/tmp/streaming-input/0016.2003-12-19.GP.spam.txt\n",
      "\n",
      "Iteration 25: destination: /home/jovyan/notebooks/tmp/streaming-input/0036.2001-08-05.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 26: destination: /home/jovyan/notebooks/tmp/streaming-input/0006.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 27: destination: /home/jovyan/notebooks/tmp/streaming-input/0006.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 28: destination: /home/jovyan/notebooks/tmp/streaming-input/0025.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 29: destination: /home/jovyan/notebooks/tmp/streaming-input/0005.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 30: destination: /home/jovyan/notebooks/tmp/streaming-input/0005.2001-06-23.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 31: destination: /home/jovyan/notebooks/tmp/streaming-input/0015.2003-12-19.GP.spam.txt\n",
      "\n",
      "Iteration 32: destination: /home/jovyan/notebooks/tmp/streaming-input/0013.2001-06-30.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 33: destination: /home/jovyan/notebooks/tmp/streaming-input/0028.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 34: destination: /home/jovyan/notebooks/tmp/streaming-input/0014.2001-07-04.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 35: destination: /home/jovyan/notebooks/tmp/streaming-input/0011.2001-06-29.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 36: destination: /home/jovyan/notebooks/tmp/streaming-input/0025.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 37: destination: /home/jovyan/notebooks/tmp/streaming-input/0002.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 38: destination: /home/jovyan/notebooks/tmp/streaming-input/0008.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 39: destination: /home/jovyan/notebooks/tmp/streaming-input/0012.2003-12-19.GP.spam.txt\n",
      "\n",
      "Iteration 40: destination: /home/jovyan/notebooks/tmp/streaming-input/0022.2004-08-03.BG.spam.txt\n",
      "\n",
      "Iteration 41: destination: /home/jovyan/notebooks/tmp/streaming-input/0029.2001-08-03.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 42: destination: /home/jovyan/notebooks/tmp/streaming-input/0008.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 43: destination: /home/jovyan/notebooks/tmp/streaming-input/0002.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 44: destination: /home/jovyan/notebooks/tmp/streaming-input/0022.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 45: destination: /home/jovyan/notebooks/tmp/streaming-input/0030.2004-08-03.BG.spam.txt\n",
      "\n",
      "Iteration 46: destination: /home/jovyan/notebooks/tmp/streaming-input/0010.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 47: destination: /home/jovyan/notebooks/tmp/streaming-input/0020.2004-08-02.BG.spam.txt\n",
      "\n",
      "Iteration 48: destination: /home/jovyan/notebooks/tmp/streaming-input/0002.2001-05-25.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 49: destination: /home/jovyan/notebooks/tmp/streaming-input/0010.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 50: destination: /home/jovyan/notebooks/tmp/streaming-input/0030.2001-08-01.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 51: destination: /home/jovyan/notebooks/tmp/streaming-input/0020.2001-07-28.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 52: destination: /home/jovyan/notebooks/tmp/streaming-input/0030.2003-12-21.GP.spam.txt\n",
      "\n",
      "Iteration 53: destination: /home/jovyan/notebooks/tmp/streaming-input/0017.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 54: destination: /home/jovyan/notebooks/tmp/streaming-input/0027.2004-08-02.BG.spam.txt\n",
      "\n",
      "Iteration 55: destination: /home/jovyan/notebooks/tmp/streaming-input/0026.2003-12-18.GP.spam.txt\n",
      "\n",
      "Iteration 56: destination: /home/jovyan/notebooks/tmp/streaming-input/0011.2001-06-28.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 57: destination: /home/jovyan/notebooks/tmp/streaming-input/0017.2004-08-01.BG.spam.txt\n",
      "\n",
      "Iteration 58: destination: /home/jovyan/notebooks/tmp/streaming-input/0026.2001-07-13.SA_and_HP.spam.txt\n",
      "\n",
      "Iteration 59: destination: /home/jovyan/notebooks/tmp/streaming-input/0024.2004-08-02.BG.spam.txt\n",
      "\n",
      "Iteration 60: destination: /home/jovyan/notebooks/tmp/streaming-input/0032.2003-12-22.GP.spam.txt\n",
      "\n",
      "Iteration 61: destination: /home/jovyan/notebooks/tmp/streaming-input/0034.2004-08-03.BG.spam.txt\n"
     ]
    }
   ],
   "source": [
    "val directoryServerThread = new Thread(new DataDirectoryServer(makePath(watchedDirectory), makePath(dataSource)))\n",
    "directoryServerThread.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start the streaming process and wait forever. To have it exit after a certain number of milliseconds, pass a number for the milliseconds as the argument to `awaitTermination`. \n",
    "\n",
    "We'll wrap this in a separate thread so we can retain some control for stopping everything."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "streamRunnable = $anon$1@17c9de6c\n",
       "streamThread = Thread[Thread-23,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Thread[Thread-23,5,restricted-b87dcd55-e2f0-41e7-982e-fe2d581c6bcd]"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-------------------------------------------\n",
      "Time: 1527527578000 ms\n",
      "-------------------------------------------\n",
      "(amnis,2)\n",
      "(crappie,1)\n",
      "(someone,8)\n",
      "(goals,2)\n",
      "(ll,11)\n",
      "(agree,2)\n",
      "(xeni,2)\n",
      "(forex,2)\n",
      "(greater,1)\n",
      "(order,8)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527580000 ms\n",
      "-------------------------------------------\n",
      "(sent,1)\n",
      "(blood,1)\n",
      "(this,2)\n",
      "(near,1)\n",
      "(bone,1)\n",
      "(healing,1)\n",
      "(angrily,1)\n",
      "(upon,1)\n",
      "(dish,1)\n",
      "(self,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527582000 ms\n",
      "-------------------------------------------\n",
      "(coverage,1)\n",
      "(amnis,1)\n",
      "(play,1)\n",
      "(profit,1)\n",
      "(stock,3)\n",
      "(this,4)\n",
      "(starting,1)\n",
      "(is,2)\n",
      "(its,1)\n",
      "(otcbb,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527584000 ms\n",
      "-------------------------------------------\n",
      "(tantalum,1)\n",
      "(under,2)\n",
      "(health,1)\n",
      "(its,10)\n",
      "(arrange,2)\n",
      "(ivernia,2)\n",
      "(ore,1)\n",
      "(forecasts,2)\n",
      "(have,4)\n",
      "(line,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527586000 ms\n",
      "-------------------------------------------\n",
      "(someone,3)\n",
      "(ll,7)\n",
      "(agree,1)\n",
      "(offer,10)\n",
      "(shot,1)\n",
      "(pages,3)\n",
      "(opening,1)\n",
      "(ark,1)\n",
      "(improve,1)\n",
      "(counted,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527588000 ms\n",
      "-------------------------------------------\n",
      "(right,1)\n",
      "(this,4)\n",
      "(ll,1)\n",
      "(fraud,1)\n",
      "(offer,1)\n",
      "(click,3)\n",
      "(under,1)\n",
      "(have,1)\n",
      "(upon,1)\n",
      "(plain,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527590000 ms\n",
      "-------------------------------------------\n",
      "(sponsoring,1)\n",
      "(created,1)\n",
      "(someone,3)\n",
      "(ll,4)\n",
      "(offer,3)\n",
      "(pages,6)\n",
      "(include,2)\n",
      "(order,1)\n",
      "(type,3)\n",
      "(modification,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527592000 ms\n",
      "-------------------------------------------\n",
      "(under,2)\n",
      "(this,4)\n",
      "(starting,1)\n",
      "(modem,1)\n",
      "(its,2)\n",
      "(click,2)\n",
      "(combo,1)\n",
      "(gigabyte,1)\n",
      "(usb,1)\n",
      "(self,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527594000 ms\n",
      "-------------------------------------------\n",
      "(lapel,1)\n",
      "(vicodin,1)\n",
      "(mayor,1)\n",
      "(label,1)\n",
      "(fp,1)\n",
      "(pow,1)\n",
      "(baritone,1)\n",
      "(portrayal,1)\n",
      "(day,1)\n",
      "(matchbook,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527596000 ms\n",
      "-------------------------------------------\n",
      "(sweet,3)\n",
      "(waggons,1)\n",
      "(banner,2)\n",
      "(someone,1)\n",
      "(call,1)\n",
      "(goals,2)\n",
      "(ll,4)\n",
      "(offer,1)\n",
      "(bade,1)\n",
      "(country,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527598000 ms\n",
      "-------------------------------------------\n",
      "(means,2)\n",
      "(right,1)\n",
      "(this,6)\n",
      "(package,1)\n",
      "(past,1)\n",
      "(click,2)\n",
      "(acceptable,1)\n",
      "(writing,1)\n",
      "(have,4)\n",
      "(here,2)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527600000 ms\n",
      "-------------------------------------------\n",
      "(lis,1)\n",
      "(free,1)\n",
      "(are,1)\n",
      "(have,1)\n",
      "(fast,1)\n",
      "(order,1)\n",
      "(here,1)\n",
      "(with,1)\n",
      "(we,2)\n",
      "(topics,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527602000 ms\n",
      "-------------------------------------------\n",
      "(receiving,1)\n",
      "(call,1)\n",
      "(this,3)\n",
      "(abuse,1)\n",
      "(myspyerase,1)\n",
      "(is,3)\n",
      "(swarming,1)\n",
      "(previously,1)\n",
      "(click,1)\n",
      "(download,2)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527604000 ms\n",
      "-------------------------------------------\n",
      "(studio,1)\n",
      "(right,1)\n",
      "(country,1)\n",
      "(this,1)\n",
      "(youdo,1)\n",
      "(click,2)\n",
      "(finland,1)\n",
      "(have,3)\n",
      "(africa,1)\n",
      "(licensing,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527606000 ms\n",
      "-------------------------------------------\n",
      "(banner,2)\n",
      "(created,1)\n",
      "(paper,1)\n",
      "(ll,3)\n",
      "(health,2)\n",
      "(offer,1)\n",
      "(its,1)\n",
      "(reading,1)\n",
      "(blocks,1)\n",
      "(bright,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527608000 ms\n",
      "-------------------------------------------\n",
      "(solving,1)\n",
      "(package,1)\n",
      "(dispatch,1)\n",
      "(anxiety,1)\n",
      "(zoloft,1)\n",
      "(health,1)\n",
      "(loo,1)\n",
      "(pain,1)\n",
      "(tablet,1)\n",
      "(xenical,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527610000 ms\n",
      "-------------------------------------------\n",
      "(under,1)\n",
      "(country,4)\n",
      "(offer,1)\n",
      "(proceed,1)\n",
      "(event,1)\n",
      "(materially,1)\n",
      "(have,2)\n",
      "(added,1)\n",
      "(simulator,1)\n",
      "(chrome,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527612000 ms\n",
      "-------------------------------------------\n",
      "(spigot,1)\n",
      "(doctoral,1)\n",
      "(this,14)\n",
      "(country,6)\n",
      "(offer,1)\n",
      "(aphrodite,1)\n",
      "(brim,1)\n",
      "(arrested,1)\n",
      "(general,1)\n",
      "(transferred,1)\n",
      "...\n",
      "\n",
      "-------------------------------------------\n",
      "Time: 1527527614000 ms\n",
      "-------------------------------------------\n",
      "(qnijnh,1)\n",
      "(zeal,1)\n",
      "(right,1)\n",
      "(offer,1)\n",
      "(is,1)\n",
      "(ocd,1)\n",
      "(currently,1)\n",
      "(click,1)\n",
      "(contravention,1)\n",
      "(have,2)\n",
      "...\n",
      "\n"
     ]
    }
   ],
   "source": [
    "val streamRunnable = new Runnable {\n",
    "  def run(): Unit = {\n",
    "    ssc.start()\n",
    "    ssc.awaitTermination()\n",
    "  }\n",
    "}\n",
    "val streamThread = new Thread(streamRunnable)\n",
    "streamThread.start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Evaluate the next cell to stop the serving thread and streaming process. (If the cell evaluation hangs, stop or reset the kernel to kill it.)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "warning: there were two deprecation warnings; re-run with -deprecation for details\n"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "directoryServerThread.stop()\n",
    "ssc.stop(stopSparkContext = true)\n",
    "streamThread.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When finished with it, clean up the watched directory..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmrf(watchedDirectory)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "... and the streaming output directory"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "rmrf(outputPathRoot)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Apache Toree - Scala",
   "language": "scala",
   "name": "apache_toree_scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
